import { Agent, AgentOutputType } from './agent';
import { Handoff } from './handoff';
import {
  ResolvedAgentOutput,
  HandoffsOutput,
  AgentInputItem,
  AgentOutputItem,
} from './types';
import { RunItem, RunToolApprovalItem } from './items';
import { ModelResponse } from './model';
import {
  ReadableStreamController,
  ReadableStream as _ReadableStream,
  TransformStream,
  Readable,
} from '@openai/agents-core/_shims';
import { ReadableStream } from './shims/interface';
import { RunStreamEvent } from './events';
import { getTurnInput } from './run';
import { RunState } from './runState';
import type { InputGuardrailResult, OutputGuardrailResult } from './guardrail';
import logger from './logger';
import { StreamEventTextStream } from './types/protocol';

/**
 * Data returned by the run() method of an agent.
 */
export interface RunResultData<
  TAgent extends Agent<any, any>,
  THandoffs extends (Agent<any, any> | Handoff<any>)[] = any[],
> {
  /**
   * The original input items i.e. the items before run() was called. This may be mutated version
   * of the input, if there are handoff input filters that mutate the input.
   */
  input: string | AgentInputItem[];

  /**
   * The new items generated during the agent run. These include things like new messages, tool
   * calls and their outputs, etc.
   */
  newItems: RunItem[];

  /**
   * The raw LLM responses generated by the model during the agent run.
   */
  rawResponses: ModelResponse[];

  /**
   * The last response ID generated by the model during the agent run.
   */
  lastResponseId: string | undefined;

  /**
   * The last agent that was run
   */
  lastAgent: TAgent | undefined;

  /**
   * Guardrail results for the input messages.
   */
  inputGuardrailResults: InputGuardrailResult[];

  /**
   * Guardrail results for the final output of the agent.
   */
  outputGuardrailResults: OutputGuardrailResult[];

  /**
   * The output of the last agent, or any handoff agent.
   */
  finalOutput?:
    | ResolvedAgentOutput<TAgent['outputType']>
    | HandoffsOutput<THandoffs>;

  /**
   * The interruptions that occurred during the agent run.
   */
  interruptions?: RunToolApprovalItem[];

  /**
   * The state of the run.
   */
  state: RunState<any, TAgent>;
}

class RunResultBase<TContext, TAgent extends Agent<TContext, any>>
  implements RunResultData<TAgent>
{
  readonly state: RunState<TContext, TAgent>;

  constructor(state: RunState<TContext, TAgent>) {
    this.state = state;
  }

  /**
   * The history of the agent run. This includes the input items and the new items generated during
   * the agent run.
   *
   * This can be used as inputs for the next agent run.
   */
  get history(): AgentInputItem[] {
    return getTurnInput(this.input, this.newItems);
  }

  /**
   * The new items generated during the agent run. These include things like new messages, tool
   * calls and their outputs, etc.
   *
   * It does not include information about the agents and instead represents the model data.
   *
   * For the output including the agents, use the `newItems` property.
   */
  get output(): AgentOutputItem[] {
    return getTurnInput([], this.newItems);
  }

  /**
   * A copy of the original input items.
   */
  get input(): string | AgentInputItem[] {
    return this.state._originalInput;
  }

  /**
   * The run items generated during the agent run. This associates the model data with the agents.
   *
   * For the model data that can be used as inputs for the next agent run, use the `output` property.
   */
  get newItems(): RunItem[] {
    return this.state._generatedItems;
  }

  /**
   * The raw LLM responses generated by the model during the agent run.
   */
  get rawResponses(): ModelResponse[] {
    return this.state._modelResponses;
  }

  /**
   * The last response ID generated by the model during the agent run.
   */
  get lastResponseId(): string | undefined {
    const responses = this.rawResponses;
    return responses && responses.length > 0
      ? responses[responses.length - 1].responseId
      : undefined;
  }

  /**
   * The last agent that was run
   */
  get lastAgent(): TAgent | undefined {
    return this.state._currentAgent;
  }

  /**
   * Guardrail results for the input messages.
   */
  get inputGuardrailResults(): InputGuardrailResult[] {
    return this.state._inputGuardrailResults;
  }

  /**
   * Guardrail results for the final output of the agent.
   */
  get outputGuardrailResults(): OutputGuardrailResult[] {
    return this.state._outputGuardrailResults;
  }

  /**
   * Any interruptions that occurred during the agent run for example for tool approvals.
   */
  get interruptions(): RunToolApprovalItem[] {
    if (this.state._currentStep?.type === 'next_step_interruption') {
      return this.state._currentStep.data.interruptions;
    }

    return [];
  }

  /**
   * The final output of the agent. If the output type was set to anything other than `text`,
   * this will be parsed either as JSON or using the Zod schema you provided.
   */
  get finalOutput(): ResolvedAgentOutput<TAgent['outputType']> | undefined {
    if (this.state._currentStep?.type === 'next_step_final_output') {
      return this.state._currentAgent.processFinalOutput(
        this.state._currentStep.output,
      ) as ResolvedAgentOutput<TAgent['outputType']>;
    }

    logger.warn('Accessed finalOutput before agent run is completed.');
    return undefined;
  }
}

/**
 * The result of an agent run.
 */
export class RunResult<
  TContext,
  TAgent extends Agent<TContext, AgentOutputType>,
> extends RunResultBase<TContext, TAgent> {
  constructor(state: RunState<TContext, TAgent>) {
    super(state);
  }
}

/**
 * The result of an agent run in streaming mode.
 */
export class StreamedRunResult<
    TContext,
    TAgent extends Agent<TContext, AgentOutputType>,
  >
  extends RunResultBase<TContext, TAgent>
  implements AsyncIterable<RunStreamEvent>
{
  /**
   * The current agent that is running
   */
  public get currentAgent(): TAgent | undefined {
    return this.lastAgent;
  }

  /**
   * The current turn number
   */
  public currentTurn: number = 0;

  /**
   * The maximum number of turns that can be run
   */
  public maxTurns: number | undefined;

  #error: unknown = null;
  #signal?: AbortSignal;
  #readableController: ReadableStreamController<RunStreamEvent> | undefined;
  #readableStream: _ReadableStream<RunStreamEvent>;
  #completedPromise: Promise<void>;
  #completedPromiseResolve: (() => void) | undefined;
  #completedPromiseReject: ((err: unknown) => void) | undefined;
  #cancelled: boolean = false;
  #streamLoopPromise: Promise<void> | undefined;

  constructor(
    result: {
      state: RunState<TContext, TAgent>;
      signal?: AbortSignal;
    } = {} as any,
  ) {
    super(result.state);

    this.#signal = result.signal;

    this.#readableStream = new _ReadableStream<RunStreamEvent>({
      start: (controller) => {
        this.#readableController = controller;
      },
      cancel: () => {
        this.#cancelled = true;
      },
    });

    this.#completedPromise = new Promise((resolve, reject) => {
      this.#completedPromiseResolve = resolve;
      this.#completedPromiseReject = reject;
    });

    if (this.#signal) {
      const handleAbort = () => {
        if (this.#cancelled) {
          return;
        }

        this.#cancelled = true;

        const controller = this.#readableController;
        this.#readableController = undefined;

        if (this.#readableStream.locked) {
          if (controller) {
            try {
              controller.close();
            } catch (err) {
              logger.debug(`Failed to close readable stream on abort: ${err}`);
            }
          }
        } else {
          void this.#readableStream
            .cancel(this.#signal?.reason)
            .catch((err) => {
              logger.debug(`Failed to cancel readable stream on abort: ${err}`);
            });
        }

        this.#completedPromiseResolve?.();
      };

      if (this.#signal.aborted) {
        handleAbort();
      } else {
        this.#signal.addEventListener('abort', handleAbort, { once: true });
      }
    }
  }

  /**
   * @internal
   * Adds an item to the stream of output items
   */
  _addItem(item: RunStreamEvent) {
    if (!this.cancelled) {
      this.#readableController?.enqueue(item);
    }
  }

  /**
   * @internal
   * Indicates that the stream has been completed
   */
  _done() {
    if (!this.cancelled && this.#readableController) {
      this.#readableController.close();
      this.#readableController = undefined;
      this.#completedPromiseResolve?.();
    }
  }

  /**
   * @internal
   * Handles an error in the stream loop.
   */
  _raiseError(err: unknown) {
    if (!this.cancelled && this.#readableController) {
      this.#readableController.error(err);
      this.#readableController = undefined;
    }
    this.#error = err;
    this.#completedPromiseReject?.(err);
    this.#completedPromise.catch((e) => {
      logger.debug(`Resulted in an error: ${e}`);
    });
  }

  /**
   * Returns true if the stream has been cancelled.
   */
  get cancelled(): boolean {
    return this.#cancelled;
  }

  /**
   * Returns the underlying readable stream.
   * @returns A readable stream of the agent run.
   */
  toStream(): ReadableStream<RunStreamEvent> {
    return this.#readableStream as ReadableStream<RunStreamEvent>;
  }

  /**
   * Await this promise to ensure that the stream has been completed if you are not consuming the
   * stream directly.
   */
  get completed() {
    return this.#completedPromise;
  }

  /**
   * Error thrown during the run, if any.
   */
  get error() {
    return this.#error;
  }

  /**
   * Returns a readable stream of the final text output of the agent run.
   *
   * @returns A readable stream of the final output of the agent run.
   * @remarks Pass `{ compatibleWithNodeStreams: true }` to receive a Node.js compatible stream
   * instance.
   */
  toTextStream(): ReadableStream<string>;
  toTextStream(options?: { compatibleWithNodeStreams: true }): Readable;
  toTextStream(options?: {
    compatibleWithNodeStreams?: false;
  }): ReadableStream<string>;
  toTextStream(
    options: { compatibleWithNodeStreams?: boolean } = {},
  ): Readable | ReadableStream<string> {
    const stream = this.#readableStream.pipeThrough(
      new TransformStream<RunStreamEvent, string>({
        transform(event, controller) {
          if (
            event.type === 'raw_model_stream_event' &&
            event.data.type === 'output_text_delta'
          ) {
            const item = StreamEventTextStream.parse(event.data);
            controller.enqueue(item.delta);
          }
        },
      }),
    );

    if (options.compatibleWithNodeStreams) {
      return Readable.fromWeb(stream);
    }

    return stream as ReadableStream<string>;
  }

  [Symbol.asyncIterator](): AsyncIterator<RunStreamEvent> {
    return this.#readableStream[Symbol.asyncIterator]();
  }

  /**
   * @internal
   * Sets the stream loop promise that completes when the internal stream loop finishes.
   * This is used to defer trace end until all agent work is complete.
   */
  _setStreamLoopPromise(promise: Promise<void>) {
    this.#streamLoopPromise = promise;
  }

  /**
   * @internal
   * Returns a promise that resolves when the stream loop completes.
   * This is used by the tracing system to wait for all agent work before ending the trace.
   */
  _getStreamLoopPromise(): Promise<void> | undefined {
    return this.#streamLoopPromise;
  }
}
